Moju Kapu モジュカプ

How mirai and crew Are Powering the Next Generation of Parallel Computing in R

Charlie Gao and Will Landau

Hibiki AI Limited, Eli Lilly and Company

2024-07-09

moju-kapu (モジュカプ) is shorthand for modular encapsulation (モジュラーカプセル化)

  • Balance

  • Effective stand-alone tools < > entire integrated systems

  • Natural limits of a package

  • Interfaces for developers as well as end-users

  • Layered engineering approach

The Back Story

  • Why targets needed crew?

  • What was missing for crew to become feasible?

  • How mirai provided the solution.

The Back Story (Cont’d)

  • Feb 2023 - CG/WL collaboration starts

  • Mar 2023 - initial mirai backend for crew

  • Apr 2023 - targets 1.0.0 with crew integration

  • Jul 2023 - TLS lands in mirai and crew

  • Oct 2023 - mirai implements parallel backend for R

  • Dec 2023 - mirai serialization initial support for torch

  • Mar 2024 - mirai serialization supports ADBC database hosting

  • May 2024 - mirai 1.0.0 - implements next-gen promises

mirai

みらい       / mI ˈ ra ˈ i: /

  1. future

Minimalist Async Evaluation Framework for R

mirai - Designed for Production

  1. High Performance
  2. Simple and Robust
  3. Massively Scalable
  • 100x faster
  • WYSIWYG concept
  • one million promises

External Encapsulation

R parallel   An alternative communications backend for R, implementing a request by R-Core at R Project Sprint 2023

promises   Next generation completely event-driven promises

Shiny   Asynchronous parallel / distributed backend

Plumber   Asynchronous parallel / distributed backend

Arrow   Host ADBC database connections in daemon processes

torch   Seamless cross-process use of Torch tensors and models

crew

crew: Moju Kapu for mirai

  • Motivation: auto-scaled parallel computing for targets.
  • Encapsulation: R6 interface for mirai tasks and workers.
  • Modularity: plugins for different computing environments.

Encapsulation: R6 classes to wrap mirai



# Start a new controller.
x <- crew::crew_controller_local(
  workers = 10,
  seconds_idle = 30
)

# Submit many parallel tasks.
x$walk(
  rnorm(1, x),
  iterate = list(x = seq_len(1000))
)

# Optional: wait for all tasks.
x$wait(mode = "all")

# Collect results so far.
str(unlist(x$collect()$result))
#> num [1:1000] 3.2 4.1 2.31 ...

Modularity: crew plugins

crew_controller_local()


crew_controller_slurm(
  slurm_memory_gigabytes_per_cpu = 16,
  script_lines = "module load R/4.4.0"
)
crew_controller_aws_batch(
  aws_batch_job_definition = "your_def",
  aws_batch_job_queue = "your_queue"
)
your_custom_controller(...)

https://wlandau.github.io/crew/articles/plugins.html

targets accepts any crew controller

tar_option_set(
  controller = crew_controller_aws_batch(
    workers = 3,
    seconds_idle = 60,
    aws_batch_job_definition = "your_def",
    aws_batch_job_queue = "your_queue",
    aws_batch_region = "us-east-2"

  )
)

Appendix

1. mirai Design Concepts

100x Faster

Setup:

library(mirai)
library(future)

d <- daemons(1, dispatcher = FALSE)
plan("multisession", workers = 1)

m <- mirai(1)
collect_mirai(m)
#> [1] 1

f <- future(1)
value(f)
#> [1] 1

Created on 2024-05-27 with reprex v2.1.0

100x Faster (Cont’d)

Benchmarking:

bench::mark(mirai(1), future(1), relative = TRUE, check = FALSE)
#> # A tibble: 2 × 6
#>   expression   min median `itr/sec` mem_alloc `gc/sec`
#>   <bch:expr> <dbl>  <dbl>     <dbl>     <dbl>    <dbl>
#> 1 mirai(1)      1      1       74.6      1        1   
#> 2 future(1)   158.   113.       1        5.72     2.75

bench::mark(collect_mirai(m), value(f), relative = TRUE)
#> # A tibble: 2 × 6
#>   expression         min median `itr/sec` mem_alloc `gc/sec`
#>   <bch:expr>       <dbl>  <dbl>     <dbl>     <dbl>    <dbl>
#> 1 collect_mirai(m)   1      1        84.1       Inf      NaN
#> 2 value(f)          79.3   89.0       1         NaN      Inf

Created on 2024-05-27 with reprex v2.1.0

WYSIWYG Concept

Production usage requires ‘correctness’ over ‘convenience’.

Code behaves as written. There is no reliance on non-transparent static code analysis, which can result in inefficient behaviour, or even fail due to hidden global options:

library(mirai)
library(future)
df <- list(a = double(1e8), b = 1)

m <- mirai(2 * x, x = df$b)
m[]
#> [1] 2

f <- future(2 * df$b)
#> Error in getGlobalsAndPackages(expr, envir = envir, tweak =
#> tweakExpression, : The total size of the 1 globals exported for
#> future expression ('2 * df$b') is 762.94 MiB.. This exceeds the
#> maximum allowed size of 500.00 MiB (option 'future.globals.maxSize')
#> . There is one global: 'df' (762.94 MiB of class 'list')

One Million Promises

library(mirai)
daemons(8, dispatcher = FALSE)
#> [1] 8

r <- 0
start <- Sys.time()
m <- mirai_map(1:1000000, \(x) x, .promise = \(x) r <<- r + x)
Sys.time() - start
#> Time difference of 6.42396 mins

later::run_now()
r
#> [1] 500000500000

Created on 2024-05-27 with reprex v2.1.0
Running on an Intel i7 Gen 11 notebook with 16GB RAM.